[kernel-1116] browser events: add external events#227
[kernel-1116] browser events: add external events#227archandatta wants to merge 76 commits intomainfrom
Conversation
…ies and fix delimiter in CategoryFor
… and category validation
…alloc in filewriter, fix session doc
This binary is tracked on main and was incidentally deleted earlier on this branch. Restoring it keeps the 13.4MB binary out of this PR's diff. Removing the tracked binary from main should be done in a separate PR.
Sayan-
left a comment
There was a problem hiding this comment.
the implementation is internally leaning toward “seq is per capture session” but the docs don’t clearly say that. would recommend:
- make seq process-wide by not resetting it, or
- keep the current behavior and document that seq / Last-Event-ID are scoped to the active capture session
| content: | ||
| application/json: | ||
| schema: | ||
| $ref: "#/components/schemas/Event" |
There was a problem hiding this comment.
I wouldn't $ref: Event for the request body. The server forces source.kind = kernel_api and silently defaults to system category. I think this source.kind should be reserved for when we produce events on API requests to the image server and that we should let event publishers define whatever source.kind/category they want. I would define PublishEventRequest with only the fields the server actually honors so SDK consumers don't see fields that get overwritten.
| $ref: "#/components/schemas/Event" | ||
| responses: | ||
| "200": | ||
| description: Event accepted and published |
There was a problem hiding this comment.
Empty 200 means callers can't correlate their POST to the stream. Return the assigned envelope
| session. Omit or send 0 to start from the oldest buffered event. | ||
| Sequence numbers reset to 0 when a new capture session is started; | ||
| a value from a previous session is not meaningful here and may | ||
| cause the stream to miss events silently. |
There was a problem hiding this comment.
Under the global-stream model, seqs are process-monotonic — they don't reset on session start. This warning ("a value from a previous session is not meaningful") becomes obsolete and the silent-miss footgun goes away.
| content: | ||
| text/event-stream: | ||
| schema: | ||
| type: string |
There was a problem hiding this comment.
is there no way to type this? what do we do in other sse streams here and in the API?
There was a problem hiding this comment.
good catch i missed updating this after creating the event structs 56782c0
| // the sequence counter so each session starts at seq 1. | ||
| // the sequence counter so each session starts at seq 1. Sequence numbers are | ||
| // scoped to the active session; Last-Event-ID values from a previous session | ||
| // are not valid for reconnecting to a new one. |
There was a problem hiding this comment.
i would hoist ring, seq, and the system-event publish responsibility out of CaptureSession into a process-lifetime EventStream (or EventBus). CaptureSession then holds the stream as a dependency and is just (a) category filter for file writes + (b) capture-preferences metadata. This is the structural change that makes the global-stream model real.
we may also want to just rm the file writing since in a global event stream model it is less clear what events go to these files
| if body.Ts != nil { | ||
| ev.Ts = *body.Ts | ||
| } | ||
| ev.Ts = time.Now().UnixMicro() |
There was a problem hiding this comment.
@raf any thoughts for this? I'm going back and forth if we should allow for the timestamp to be sent or not because then the request could completely miss align with the event
…p category description
| env, _ = truncateIfNeeded(env) | ||
| es.ring.publish(env) | ||
| return env | ||
| } |
There was a problem hiding this comment.
EventStream.publish releases lock before ring.publish risking out-of-order events
Medium Severity
EventStream.publish releases es.mu after assigning the seq but before calling truncateIfNeeded and ring.publish. If two goroutines call publish concurrently (possible since EventStream is a shared process-lifetime bus accepting any *EventStream pointer), events can arrive at the ring buffer out of order. The ring buffer unconditionally sets latestSeq = env.Seq in its own publish, so a late-arriving lower seq would regress latestSeq, causing readers to block on already-buffered events or miss them entirely. The lock scope needs to cover through ring.publish to maintain the seq-order invariant the readers depend on.
Reviewed by Cursor Bugbot for commit 678df70. Configure here.
rgarcia
left a comment
There was a problem hiding this comment.
some things still outstanding in terms of decoupling the publish/subscribe API from the capture session:
- PublishEvent 400s when no capture session active (events.go:21-23)
- StreamEvents 400s when no capture session active (events.go:82-84)
- SSE stream terminates on
session_ended(events.go:145-147)
Also nit but we should rename the capture session events to capture_session otherwise we risk confusion with the browser session itself beginning/ending
External publish should be treated like a peer producer to the cdp producer, so I think instead of Publish and PublishUnfiltered living on CaptureSession it should be something like
// EventStream gets a public Publish (today's es.publish is private)
func (es *EventStream) Publish(env Envelope) Envelope
// CaptureSession.Publish stays, applies its own config-driven filter,
// internally calls es.Publish
func (s *CaptureSession) Publish(ev Event)
// PublishUnfiltered goes away
Also it looks like capture session id is still a top level field on the event envelope.
// today
type Envelope struct {
CaptureSessionID string // producer detail leaking up
Seq uint64 // actual pipeline metadata
Event Event
}
In a global-event stream model I think it should be
type Envelope struct {
Seq uint64
Event Event
}
type Source struct {
Kind SourceKind
Event string
Metadata map[string]string // capture session id can go here as a producer detail
}
also the SSE stream data and response to POST /events should be identical to the envelope i.e. {seq: ..., event: ...}
4be4ea3 to
678df70
Compare
…act CaptureSession to lib/capturesession
…ped → EventsDropped
rgarcia
left a comment
There was a problem hiding this comment.
-
let's drop events_dropped and capture_session_ended as synthetic frames on the stream. Both can be inferred by clients without server-side synthesis. Removing them simplifies the wire format (one frame shape, no seq=0 sentinel, no omitted-id: carve-out) and stops mixing system control signals with user payload events.
-
PublishedEnvelope.seqopenapi.yaml description still says "assigned to this event within the current capture session" — stale now that seqs are process-monotonic and the stream isn't session-scoped. Reword to "process-monotonic across the lifetime of the server."
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 808f9db. Configure here.
rgarcia
left a comment
There was a problem hiding this comment.
LGTM — global event stream model is fully realized. Bus is decoupled from session lifecycle, capture session is one producer among many, schemas are clean across POST and SSE, and the wire format is uniform now that the synthetic control frames are gone. Drop-detection contract is documented for SDK consumers. Ship it.


As per plan -> #217
Note
Medium Risk
Introduces new public event ingestion/streaming endpoints and refactors the core event/capture-session pipeline (seq semantics and envelope shape), which could impact existing consumers and event durability/ordering behavior.
Overview
Adds external event support via new API endpoints:
POST /events/publishto inject validated events into the server’s event bus andGET /events/streamto consume envelopes over SSE withLast-Event-IDresume and keepalives.Refactors the event pipeline by splitting responsibilities into a process-wide
events.EventStream(ring buffer + process-monotonic sequence) and a newcapturesession.CaptureSession(session lifecycle + category filtering + stampingcapture_session_idintoevent.source.metadata), removing the oldevents.CaptureSessionand file-writing path. The OpenAPI-generated types/handlers are updated accordingly (new schemas, renamed capture category enums, SSE responses now setCache-Control: no-cacheandX-Accel-Buffering: no), along with build tooling tweaks to patch the additional SSE method.Reviewed by Cursor Bugbot for commit bb38112. Bugbot is set up for automated code reviews on this repo. Configure here.